Implementation of the RecordNameStrategy for Protobuf, JSON and Avro(Generic and Specific) #1063
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Hi, here is a full implementation of the RecordNameStrategy for Protobuf, JSON and Avro(Generic and Specific).
It follows this interface
For Protobuf, the solution was obvious since the protobuf schema contains the fullyQualifiedName of the message (which also appears within the SchemaRegistry API record)
When serializing JSON messages, I use the Go fullyQualifiedName (instead of nothing). Then, I add it to the schemaBytes. For example, if it's 'main.Person,' 'main' goes into the namespace (of the schemaInfo), and 'Person' remain the name. On the consumer side, the consumer extracts these two fields, rebuilds the fullyQualifiedName, and can then identify the matching instance for deserializing the bytes. As the schemaInfo is stored inside the cache, it works the same way. For the default MessageFactory, it simply returns a *map[string]interface{}.
For AvroGeneric, the process is similar to JSON since it doesn't have a specific fullyQualifiedName. In the default MessageFactory, I use 'github.com/linkedin/goavro' to deserialize the bytes without specifying an Avro schema.
For AvroSpecific, we use the schema's defined namespace if available, otherwise, we use the Go fullyQualifiedName. The default MessageFactory also uses 'github.com/linkedin/goavro'.
In all four cases, the fullyQualifiedName is present in the schemaInfo, making it conveniently consultable within the SchemaRegistry API record. Additionally, since these schemas are stored in the cache, the fullyQualifiedName is always readily available.
For the cache, I had to add a new keyType called
subjectOnlyID
, which refers only to the 'id' of the SchemaRegistry API record. This is because it's the only piece of information we have available when receiving a message. Additionally, I added a new function to handle this case.In the
func (s *BaseDeserializer) GetSchema(subject string, payload []byte)
method, if the schema is empty, we uses.Client.GetByID(int(id))
(case where only the id is present)I made some minor modifications to the mock_schemaregistry_client without affecting the previous implementation. Additionally, I introduced 2-3 Protobuf and Avro messages with different namespaces, using the schemas(.avsc .proto) already available, in order to minimize the modifications required for the mock.
All the tests for the full implementation are present.
I've added a complete examples folder in 'examples/schemaregistry_example,' and it's quite comprehensive. I'm excited to showcase the functionality.
I did add few other things, even to improve the topicNameStrategy(like for Avro) and more, I let you findout when going throught the code.
No breaking change.
Based on my testing, it appears to be working well.
I look forward to your feedback and hope you find this implementation valuable.